package com.dyrnq.seckill.mq.redis;

import com.dyrnq.seckill.config.Constants;
import com.dyrnq.seckill.mq.MessageVo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.convert.MappingRedisConverter;
import org.springframework.data.redis.core.convert.RedisConverter;
import org.springframework.data.redis.core.mapping.RedisMappingContext;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.data.redis.hash.ObjectHashMapper;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;

import java.util.Collections;

@Configuration
@Slf4j
@ConditionalOnProperty(value = "queue.impl", havingValue = "redis")
public class RedisStreamConfig {

    @Bean
    RedisMappingContext redisMappingContext() {
        RedisMappingContext ctx = new RedisMappingContext();
        ctx.setInitialEntitySet(Collections.singleton(MessageVo.class));
        return ctx;
    }

    @Bean
    RedisConverter redisConverter(RedisMappingContext redisMappingContext) {
        return new MappingRedisConverter(redisMappingContext);
    }

    @Bean
    ObjectHashMapper hashMapper(RedisConverter converter) {
        return new ObjectHashMapper(converter);
    }

    private void createStream(StringRedisTemplate stringRedisTemplate, String consumerGroup) {
        DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
        redisScript.setLocation(new ClassPathResource("seckill_stream.lua"));
        redisScript.setResultType(Long.class);

        Long result = stringRedisTemplate.execute(
                redisScript,
                Collections.emptyList(),
                Constants.REDIS_STREAM_ORDERS_KEY, consumerGroup, "0"
        );
        int r = result.intValue();
    }

    @Bean(initMethod = "start", destroyMethod = "stop")
    StreamMessageListenerContainer streamMessageListenerContainer(
            StringRedisTemplate stringRedisTemplate,
            RedisConnectionFactory connectionFactory,
            ObjectHashMapper hashMapper,
            @Qualifier("redisStreamListener") StreamListener redisStreamListener,
            @Value("${redis.stream.consumer.name:my-consumer}") String consumerName,
            @Value("${redis.stream.consumer.group:my-group}") String consumerGroup) {
        log.info("create streamMessageListenerContainer consumerName={}", consumerName);

        createStream(stringRedisTemplate, consumerGroup);

        StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, MessageVo>> options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
                .objectMapper(hashMapper)
                .targetType(MessageVo.class)
                .build();

        StreamMessageListenerContainer<String, ObjectRecord<String, MessageVo>> container = StreamMessageListenerContainer.create(connectionFactory, options);
//        container.receiveAutoAck(Consumer.from(Constants.REDIS_STREAM_ORDERS_GROUP, consumerName),
//                StreamOffset.create(Constants.REDIS_STREAM_ORDERS_KEY, ReadOffset.lastConsumed()),
//                exampleStreamListener
//        );

        //指定消费最新的消息
        StreamOffset<String> offset = StreamOffset.create(Constants.REDIS_STREAM_ORDERS_KEY, ReadOffset.lastConsumed());
        //创建消费者
        Consumer consumer = Consumer.from(consumerGroup, consumerName);
        StreamMessageListenerContainer.StreamReadRequest<String> streamReadRequest = StreamMessageListenerContainer.StreamReadRequest.builder(offset)
                .errorHandler((error) -> {
                })
                .cancelOnError(e -> false)
                .consumer(consumer)
                .autoAcknowledge(true)
                .build();
        //指定消费者对象
        container.register(streamReadRequest, redisStreamListener);


        return container;
    }
}
